package com.ruyuan.eshop.promotion.service.impl;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.constants.RedisKey;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformCouponUserBucketMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.membership.domain.dto.MembershipAccountDTO;
import com.ruyuan.eshop.promotion.converter.PromotionConverter;
import com.ruyuan.eshop.promotion.dao.SalesPromotionDAO;
import com.ruyuan.eshop.promotion.domain.dto.SaveOrUpdatePromotionDTO;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionDO;
import com.ruyuan.eshop.promotion.domain.request.SaveOrUpdatePromotionRequest;
import com.ruyuan.eshop.promotion.mq.event.SalesPromotionCreatedEvent;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.redis.RedisCache;
import com.ruyuan.eshop.promotion.service.PromotionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ruyuan.eshop.common.constants.RedisKey.PROMOTION_CONCURRENCY_KEY;

/**
 * @author zhonghuashishan
 */
@Service
@Slf4j
public class PromotionServiceImpl implements PromotionService {

    /**
     * 开启促销活动DAO
     */
    @Autowired
    private SalesPromotionDAO salesPromotionDAO;

    /**
     * redis缓存工具
     */
    @Resource
    private RedisCache redisCache;

    /**
     * rocketmq生产者
     */
    @Resource
    private DefaultProducer defaultProducer;

    @Resource
    private PromotionConverter promotionConverter;

    /**
     * 商品服务
     */
    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;

    /**
     * 新增或修改一个运营活动
     * @param request
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {
        // 判断是否活动是否重复
        String result = redisCache.get(PROMOTION_CONCURRENCY_KEY +
                request.getName() +
                request.getCreateUser() +
                request.getStartTime().getTime() +
                request.getEndTime().getTime());
        if(StringUtils.isNotBlank(result)){
            return null;
        }

        log.info("活动内容：{}", request);
        // 活动规则
        String rule = JsonUtil.object2Json(request.getRule());

        // 构造促销活动实体
        SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request);
        salesPromotionDO.setRule(rule);

        // 促销活动落库
        salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO);

        redisCache.set(PROMOTION_CONCURRENCY_KEY +
                request.getName() +
                request.getCreateUser() +
                request.getStartTime().getTime() +
                request.getEndTime().getTime(), UUID.randomUUID().toString(),30 * 60);

        // 为所有用户推送促销活动，发MQ
//        sendPlatformPromotionMessage(salesPromotionDO);
        publishSalesPromotionCreatedEvent(salesPromotionDO);

        // 构造响应数据
        SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO();
        dto.setName(request.getName());
        dto.setType(request.getType());
        dto.setRule(rule);
        dto.setCreateUser(request.getCreateUser());
        dto.setSuccess(true);
        return dto;
    }

    /**
     * 发布促销活动创建事件
     * @param salesPromotion
     */
    private void publishSalesPromotionCreatedEvent(SalesPromotionDO salesPromotion) {
        SalesPromotionCreatedEvent salesPromotionCreatedEvent =
                new SalesPromotionCreatedEvent();
        salesPromotionCreatedEvent.setSalesPromotion(salesPromotion);
        String salesPromotionCreatedEventJSON = JsonUtil.object2Json(salesPromotionCreatedEvent);

        defaultProducer.sendMessage(
                RocketMqConstant.SALES_PROMOTION_CREATED_EVENT_TOPIC,
                salesPromotionCreatedEventJSON,
                "发布促销活动创建事件");
    }

    /**
     * 为所有用户发推送促销活动
     * @param promotionDO
     */
    private void sendPlatformPromotionMessage(SalesPromotionDO promotionDO) {
        // 桶的大小，可以抽到参数里
        final int userBucketSize = 1000;
        final int messageBatchSize = 100;

        // 1、查询出库里面最大的userId，作为用户的总数量
        JsonResult<Long> maxUserIdJsonResult = accountApi.queryMaxUserId();
        if (!maxUserIdJsonResult.getSuccess()) {
            throw new BaseBizException(maxUserIdJsonResult.getErrorCode(), maxUserIdJsonResult.getErrorMessage());
        }
        Long maxUserId = maxUserIdJsonResult.getData();

        // 用于自己测试功能流程，百万数据测试一次成本太高。
        // maxUserId = 1000L;
        // 2、分成m个桶，每个桶里面有n个用户，每个桶发送一条"批量发送优惠券用户桶消息"，
        // 例：maxUserId = 100w; userBucketSize=1000
        // userBucket1 = [1, 1001)
        // userBucket2 = [1001, 2001)
        // userBucketCount = 1000
        Map<Long, Long> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        long startUserId = 1L;
        while (flagRef.get()) {
            if (startUserId > maxUserId) {
                flagRef.compareAndSet(true, false);
            }
            userBuckets.put(startUserId, startUserId + userBucketSize);
            startUserId += userBucketSize;
        }

        ///3、批量发送消息
        // 例：userBucketCount = 1000; messageBatchSize = 100
        // 批量发送次数 = 10次，经过两次分桶，这里发送消息的次数从100w次降到10次
        int handledBukectCount = 0;
        List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
        for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
            handledBukectCount++;
            PlatformPromotionUserBucketMessage message = PlatformPromotionUserBucketMessage.builder()
                    .startUserId(userBucket.getKey())
                    .endUserId(userBucket.getValue())
                    .promotionId(promotionDO.getId())
                    .promotionType(promotionDO.getType())
                    .mainMessage(promotionDO.getName())
                    .message("您已获得活动资格，打开APP进入活动页面")
                    .informType(promotionDO.getInformType())
                    .build();
            String jsonMessage = JsonUtil.object2Json(message);
            jsonMessageBatch.add(jsonMessage);

            if (jsonMessageBatch.size() == messageBatchSize || handledBukectCount == userBuckets.size()) {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC,
                        jsonMessageBatch, "平台发放促销活动用户桶消息");
                jsonMessageBatch.clear();
            }
        }
    }
}
